Amazon SESの送信イベントを送信情報に基づき分割してS3バケット上に保存する
初めに
以前Amazon SESから送信したメールのログ(イベント)をS3バケットに格納する方法を紹介させていただきました。
現在ではVirtual Delivery Managerでログを確認できますが、より長期もしくは詳細に確認したい場合は引き続き変更セットを割り当てS3等にログを配布する必要があります。
同一の検証ID内ではイベント種別によってログの配布先を分離することができますが同一イベント同士で別々の配布先に渡すことができないためシステム毎に分けてファイル出力したい場合は何らか別の手段を用意する必要があります。
システムごとに利用アドレスが異なる場合検証IDをメールアドレス単位にすることで対応できますが、複数システムで共通のアドレスを使っている場合もありますし利用するアドレスによっては認証が手間であったりもします。
Kinesis Data Firehoseによる動的パーティショニング
Amazon Kinesis Data Firehoseによる動的パーティショニングと呼ばれる機能があり、こちらを用いることで配布先がS3の場合データの実際の値をもとにパーティションの値を決定することができます。
ただこの値を指定できるのはキープレフィックスとなるためバケット自体は別のものとすることができない点はご注意ください。
https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/dynamic-partitioning.html
パーティショニングキーの作成でサポートされているメソッドは次のとおりです。
インライン解析 - このメソッドは、JSON 形式のデータレコードからパーティショニングするためのキーの抽出で、Amazon Kinesis Data Firehose 組み込みサポートメカニズムである jq パーサーを使用します。
AWS Lambda 関数 - このメソッドは、指定された AWS Lambda 関数を使用して、パーティショニングに必要なデータフィールドを抽出して返します 。
動的パーティショニングではFirehose組み込みのjqパーサ、もしくはLambdaによるパラメータ抽出・変換の2種類のいずれかが利用可能です。
jqパーサーでも十分な場合が多いと思いますがより複雑な処理をしたい場合やJSONではない場合はLambda関数を利用して加工する事になりそうです。
今回はデータソースがJSONかつシンプルな抽出ですのでjqパーサーを利用します。
追加料金
動的パーティショニングの利用には通常の配信に加え以下の追加料金が発生します(執筆時点での東京リージョンのもの)。
最新の情報は料金ページの確認をお願いいたします。
項目 | 金額 |
---|---|
配信された GB あたり | 0.032USD |
配信された 1,000 S3 オブジェクトあたり | 0.008USD |
JQ 処理、1 時間あたり (オプション) | 0.112USD |
バッファを900秒(=15分)にしておけば配信オブジェクトは1日辺りでも96*{{最大分割数}}
個、JQ処理は今回のシンプルな例であれば70ms弱/件だったので小規模なシステムであればそこまでかからないでしょうか?
ただ発生イベント量による部分は大きいと思いますので事前に予想外の料金とならないかは確認しておきましょう。
差し替え時の注意
既に配信ストリームが存在する場合、動的パーティショニングは既存の配信ストリームに対して有効化を行うことができず作り直す必要があります。
幸いAmazon SES変更セットには複数のイベント先の割り当てが可能ですので一旦動的パーティショニングが有無両立させて不要であれば以前の配信ストリームを削除という形が無難でしょう。
設定
利用するキーの決定
以下は以前別件の記事で利用した送信のログです。
今回は送信システム毎に異なるSMTPユーザ(IAMユーザ)を利用しているものと仮定を置いてそれを示す値であるses:caller-identity
の値を利用して分けてみます。
送信元IP毎に分類したいのであればses:source-ip
を利用するなど各環境事情に合わせて必要なキーを選択しましょう。
{ "eventType": "Delivery", "mail": { "timestamp": "2023-08-14T08:46:58.328Z", "source": "postfix@example.com", "sourceArn": "arn:aws:ses:ap-northeast-1:xxxxxxxxxxxx:identity/example.com", "sendingAccountId": "xxxxxxxxxxxx", "messageId": "01060189f33a1918-152cfea8-fd15-49d8-a778-e6445c97986c-000000", "destination": [ "receive@example.com" ], "headersTruncated": false, "headers": [ { "name": "Received", "value": "from ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal (ec2-43-207-xxx-xxx.ap-northeast-1.compute.amazonaws.com [43.207.xxx.xxx]) by email-smtp.amazonaws.com with SMTP (SimpleEmailService-d-RJ9XEHDO0) id tSZrKRumQhOCiKN7EFdt for receive@example.com; Mon, 14 Aug 2023 08:46:58 +0000 (UTC)" }, { "name": "Received", "value": "from mail.example.com (localhost [127.0.0.1]) by ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal (Postfix) with SMTP id 8C3C8CBABF3 for <receive@example.com>; Mon, 14 Aug 2023 08:46:44 +0000 (UTC)" }, { "name": "From", "value": "postfix@example.com" }, { "name": "To", "value": "receive@example.com" }, { "name": "Subject", "value": "from ses" }, { "name": "Message-Id", "value": "<20230814084650.8C3C8CBABF3@ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal>" }, { "name": "Date", "value": "Mon, 14 Aug 2023 08:46:44 +0000 (UTC)" } ], "commonHeaders": { "from": [ "postfix@example.com" ], "date": "Mon, 14 Aug 2023 08:46:44 +0000 (UTC)", "to": [ "receive@example.com" ], "messageId": "01060189f33a1918-152cfea8-fd15-49d8-a778-e6445c97986c-000000", "subject": "from ses" }, "tags": { "ses:operation": [ "SendSmtpEmail" ], "ses:configuration-set": [ "examplecom-configuration" ], "ses:source-ip": [ "43.207.xxx.xxx" ], "ses:from-domain": [ "example.com" ], "ses:caller-identity": [ "ses-smtp-user.20230808-xxxxx" ], "ses:outgoing-ip": [ "23.251.234.12" ] } }, "delivery": { "timestamp": "2023-08-14T08:47:31.069Z", "processingTimeMillis": 32741, "recipients": [ "receive@example.com" ], "smtpResponse": "250 OK cj0lrkkn6rgabtc1jrhn7e05idpcfeqaplkvnh81", "reportingMTA": "e234-12.smtp-out.ap-northeast-1.amazonses.com" } }
CloudFormationテンプレート
最初に記載した以前の記事のテンプレートを拡張する形で作成します。差分(追加)部分はハイライトで表示しています。
AWSTemplateFormatVersion: 2010-09-09 Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: General Parameters: - NoColonDomain - LogExpirationInDays - Label: default: Amazon SNS Parameters: - BaounceFeedbackAddress - ComplaintFeedbackAddress Parameters: NoColonDomain: Description: The domain name without the "." ex) example.com -> examplecom Type: String LogExpirationInDays: Description: Log expiration (day) Type: Number BaounceFeedbackAddress: Description: E-mail address for bounce feedback. empty if not needed Type: String ComplaintFeedbackAddress: Description: E-mail address for complaint feedback. empty if not needed Type: String Conditions: ExistBounceFeedbackAddress: !Not [!Equals ["", !Ref BaounceFeedbackAddress]] ExistComplaintFeedbackAddress: !Not [!Equals ["", !Ref ComplaintFeedbackAddress]] Resources: #---------------------- #--- SES Configuration #---------------------- ConfigurationSet: Type: AWS::SES::ConfigurationSet Properties: Name: !Sub ${NoColonDomain}-configuration DeliveryOptions: TlsPolicy: REQUIRE MailLogDeliverEvent: Type: AWS::SES::ConfigurationSetEventDestination Properties: ConfigurationSetName: !Ref ConfigurationSet EventDestination: Name: !Sub ${NoColonDomain}-ses-log-deliver Enabled: True MatchingEventTypes: - delivery - reject - bounce - complaint KinesisFirehoseDestination: DeliveryStreamARN: !GetAtt [SESLogDeliver, Arn] IAMRoleARN: !GetAtt [SESMailDeliverRole, Arn] SESMailDeliverRole: Type: AWS::IAM::Role Properties: RoleName: !Sub ${NoColonDomain}-ses-role AssumeRolePolicyDocument: Version: 2012-10-17 Statement: Action: sts:AssumeRole Effect: Allow Principal: Service: ses.amazonaws.com Path: / Policies: - PolicyName: !Sub ${NoColonDomain}-ses-policy PolicyDocument: Version: 2012-10-17 Statement: Effect: Allow Action: - firehose:PutRecordBatch Resource: !GetAtt [SESLogDeliver, Arn] #---------------------- #--- S3 #---------------------- MailLogBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub ${NoColonDomain}-mail-log PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True BucketEncryption: ServerSideEncryptionConfiguration: - BucketKeyEnabled: True ServerSideEncryptionByDefault: SSEAlgorithm: AES256 LifecycleConfiguration: Rules: - Id: !Sub Delete-After-${LogExpirationInDays}Days ExpirationInDays: !Ref LogExpirationInDays Status: Enabled #---------------------- #--- Firehose #---------------------- SESLogDeliver: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: !Sub ${NoColonDomain}-ses-log-deliver DeliveryStreamType: DirectPut ExtendedS3DestinationConfiguration: BucketARN: !GetAtt [MailLogBucket, Arn] BufferingHints: IntervalInSeconds: 900 SizeInMBs: 128 CloudWatchLoggingOptions: Enabled: True #NOTE: LogGroup側からRefで撮りたいが${SESLogDeliver}がLogGroup側で欲しく循環参照がかかるのでベタ書きする LogGroupName: !Sub /aws/kinesisfirehose/${NoColonDomain}-ses-log-deliver LogStreamName: S3Delivery Prefix: "!{partitionKeyFromQuery:userName}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/" PartitioningConfiguration: Enabled: True RetryOptions: DurationInSeconds: 300 ProcessingConfiguration: Enabled: True Processors: - Type: MetadataExtraction Parameters: - ParameterName: MetadataExtractionQuery ParameterValue: '{userName: .mail.tags."ses:caller-identity"[0]}' - ParameterName: JsonParsingEngine ParameterValue: JQ-1.6 CompressionFormat: GZIP ErrorOutputPrefix: "!{firehose:error-output-type}/" RoleARN: !GetAtt [FirehoseRole, Arn] FirehoseCWLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/kinesisfirehose/${SESLogDeliver} RetentionInDays: !Ref LogExpirationInDays FirehoseCWLogStream: Type: AWS::Logs::LogStream Properties: LogGroupName: !Ref FirehoseCWLogGroup LogStreamName: S3Delivery FirehoseRole: Type: AWS::IAM::Role Properties: RoleName: !Sub ${NoColonDomain}-firehose-role Path: / AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: sts:AssumeRole Effect: Allow Principal: Service: firehose.amazonaws.com Policies: - PolicyName: !Sub ${NoColonDomain}-firehose-policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:PutObject' Resource: !Sub ${MailLogBucket.Arn}/* #---------------------- #--- SNS #---------------------- BounceFeedbackTopic: Condition: ExistBounceFeedbackAddress Type: AWS::SNS::Topic Properties: TopicName: !Sub ${NoColonDomain}-bounce-feedback-topic DisplayName: !Sub ${NoColonDomain}-bounce-feedback-topic BounceFeedbackSubscription: Condition: ExistBounceFeedbackAddress Type: AWS::SNS::Subscription Properties: TopicArn: !Ref BounceFeedbackTopic Protocol: email Endpoint: !Ref BaounceFeedbackAddress ComplaintFeedbackTopic: Condition: ExistComplaintFeedbackAddress Type: AWS::SNS::Topic Properties: TopicName: !Sub ${NoColonDomain}-complaint-feedback-topic DisplayName: !Sub ${NoColonDomain}-complaint-feedback-topic ComplaintFeedbackSubscription: Condition: ExistComplaintFeedbackAddress Type: AWS::SNS::Subscription Properties: TopicArn: !Ref ComplaintFeedbackTopic Protocol: email Endpoint: !Ref ComplaintFeedbackAddress
残念ながらCloudFormationの挙動として動的パーティショニングを有効化した際に置換が行われるわけではなくエラーで止まるようなので、すでに以前の記事のテンプレートを適用している場合は別の論理IDで作成する等で意図的に別のリソースとして作成する必要があります。
動的パーティショニング部分の設定はマネジメントコンソールでみるとこのような形となっております。
動作確認
2ユーザ分SMTP用のユーザを準備して送信します。
メール送信自体は通常通り送信するだけですので省略しますが同じバッファ間隔(900秒)に収まるように送信しています。
時間をおいて配信先バケットを確認してみると別々のファイルと指定配布されていることが確認できます。
$ aws s3 ls s3://xxxxx-mail-log/ses-smtp-user.20231106-104205/2023/11/06/ 2023-11-06 12:51:54 894 examplecom-ses-dynamic-log-deliver-2-2023-11-06-03-29-19-917bfda5-190a-38f9-b39c-3bae3c65eaae.gz $ aws s3 ls s3://xxxxx-mail-log/ses-smtp-user.20231106-122830/2023/11/06/ 2023-11-06 12:51:54 892 examplecom-ses-dynamic-log-deliver-2-2023-11-06-03-30-22-769eb168-80b7-3ba7-87bc-71b47b35f6a3.gz
メトリクスを確認してもパーティションが2つに別れてることが確認できます。
終わりに
Amazon Kinesis Data Firehoseの動的パーティショニング機能を用いて値に応じてログを別のファイルに出力してみました。Lambda関数で処理を組み込んだりせずとも標準機能でできるのが魅力です。
権限的をどうしたいかや後続でどのような処理を行うかで事前に分けておくか、事後で一括で処理しつつその中で分けていくかは異なるかと思いますので実現したい事によって使い分けていきましょう。